#########
## Nature Human Behaviour 
## Leading Countries in Global Science Increasingly Receive More Citations than Other Countries Despite Doing Similar Research.
## https://doi.org/10.1038/s41562-022-01351-5
## Harvard Dataverse (Code and Metadata): https://doi.org/10.7910/DVN/WCOINR 
## Step 0B
## Data: Data_20210905
#########

# At a terminal, run: 
# '''Note: '$1' is the discipline ID that you pass in.'''
# ml python/3.6.1
# export TRANSFORMERS_CACHE=/Huggingface/
# export GOOGLE_APPLICATION_CREDENTIALS="/Google_Translate/translation-309420-a1122a44fa1d.json"
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH
# ml py-ipython/6.1.0_py36 python/3.6.1 py-scipy/1.1.0_py36 py-scikit-learn/0.19.1_py36 py-pandas/0.23.0_py36 gcc/10.1.0 py-pytorch/1.4.0_py36
# ml py-numpy/1.17.2_py36
# srun python3 -u Step_X0B_Python3_Athena_MAG_Field_RAKE_and_GoogleAPI_Corpora.py "$1" 

# How to install Python modules: 
# https://pypi.org/project/PyAthena/
# PYTHONUSERBASE=$GROUP_HOME/python/lib/python3.6/python pip3 install --user PyAthena
# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user PyAthena
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user nltk
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user psutil
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user rake-nltk
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user smart_open
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user googletrans==4.0.0-rc1
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user goslate
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

###### easynmt installation
# PYTHONUSERBASE=$GROUP_HOME/python pip3 install -U easynmt
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# OR
# PYTHONUSERBASE=$GROUP_HOME/python/lib/python3.6/python pip3 install --user git+https://github.com/UKPLab/EasyNMT
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python/lib/python3.6/python pip3 install --user requests
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python/lib/python3.6/python pip3 install --user typing
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

####### Free Google Translate
# PYTHONUSERBASE=$GROUP_HOME/python/lib/python3.6/python pip3 install --user langdetect
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --user hyper
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --upgrade h2
# export PYTHONPATH=$GROUP_HOME/python/lib/python3.6/site-packages:$PYTHONPATH

####### Google API Translate (Paid)
# PYTHONUSERBASE=$GROUP_HOME/python pip3 install --upgrade google-cloud-translate


#############################
### Input
#############################
# Sociology - 144024400
# Atomic Physics - 184779094
# Information Dissemination - 2779494480
# Internal medicine - 126322002

# discipline = '144024400' | Sociology
# discipline = '147176958' | Civil Engineering
# discipline = '188147891' | EASY FAST 

import sys
discipline = str(sys.argv[1])

#############################
### Time Start
#############################
import time 
start_time = time.time()

#############################
### Modules
#############################
# EasyNMT Translation
from easynmt import EasyNMT
model_opus_mt = EasyNMT('opus-mt')

from pyathena import connect
import pandas as pd
import json
import numpy as np
import nltk
import string
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk import everygrams
import gc 
from nltk.stem import *
from nltk.tokenize import RegexpTokenizer
from sklearn.feature_extraction.text import CountVectorizer 
from nltk.stem.porter import *
from nltk.stem.snowball import SnowballStemmer
import sys
from collections import Counter
import random
import bz2 
import pickle
#import cPickle
import itertools
import multiprocessing as mp
from os import path
#from importlib import reload
import os 
import random
import time 
import re

import itertools
import psutil

from googletrans import Translator

from rake_nltk import Metric, Rake

import boto3
import smart_open

import goslate
import math
from langdetect import detect

#from google.cloud import translate_v2 as translate
# export GOOGLE_APPLICATION_CREDENTIALS="/home/users/cjgomez/Google_Translate/translation-309420-a1122a44fa1d.json"
#translate_client = translate.Client()


#############################
### Stopwords
#############################

stopword_list_extended = pd.read_csv("INPUT_Stopword_List.csv",header=None).values.tolist()
stopword_list_extended = list(itertools.chain(*stopword_list_extended))

stopword_list = stopwords.words('english') + list(string.punctuation)
stopword_list.append('')
stopword_list.append('this')
stopword_list.append('come')
stopword_list.append('make')
stopword_list.append('among')
stopword_list.append('toward')
stopword_list.append('put')
stopword_list.append('use')
stopword_list.append('during')
stopword_list.append('since')
stopword_list.append('from')
stopword_list.append('with')
stopword_list.append('article')
stopword_list.append('has')
stopword_list.append('find')
stopword_list.append('argue')
stopword_list.append('also')
stopword_list.append('elsevi')
stopword_list.append('ltd')
stopword_list.append('use')
stopword_list.append('abstract')
stopword_list.append('paper')
stopword_list.append('article')
stopword_list.append('describe')
stopword_list.append('described')
stopword_list.extend(stopword_list_extended)
stopword_list = list(set(stopword_list))

# Source | Academic Word List Coxhead (2000) and list of action verbs. 
# http://www.uefap.com/vocab/select/awl.htm
academic_stopwords = pd.read_csv("INPUT_R_Academic_Stopwords.csv")["ACADEMIC_STOP_WORDS"].values.tolist()
academic_stopwords = [stop_.lstrip().rstrip() for stop_ in academic_stopwords]
stopword_list.extend(academic_stopwords)
stopword_list = list(set(stopword_list))

#############################
### Functions
#############################

def cpuStats():
    #print(sys.version)
    print(psutil.cpu_percent())
    print(psutil.virtual_memory())  # physical memory usage
    pid = os.getpid()
    py = psutil.Process(pid)
    memoryUse = py.memory_info()[0] / 2. ** 30  # memory use in GB...I think
    print('memory GB:', memoryUse)

def reduce_mem_usage(df):
	""" 
	iterate through all the columns of a dataframe and 
	modify the data type to reduce memory usage.        
	"""
	start_mem = df.memory_usage().sum() / 1024**2
   
	for col in df.columns:
		col_type = df[col].dtype
		
		if col_type != object:
			c_min = df[col].min()
			c_max = df[col].max()
			if str(col_type)[:3] == 'int':
				if c_min > np.iinfo(np.int8).min and c_max <\
				  np.iinfo(np.int8).max:
					df[col] = df[col].astype(np.int8)
				elif c_min > np.iinfo(np.int16).min and c_max <\
				   np.iinfo(np.int16).max:
					df[col] = df[col].astype(np.int16)
				elif c_min > np.iinfo(np.int32).min and c_max <\
				   np.iinfo(np.int32).max:
					df[col] = df[col].astype(np.int32)
				elif c_min > np.iinfo(np.int64).min and c_max <\
				   np.iinfo(np.int64).max:
					df[col] = df[col].astype(np.int64)  
			else:
				if c_min > np.finfo(np.float16).min and c_max <\
				   np.finfo(np.float16).max:
					df[col] = df[col].astype(np.float16)
				elif c_min > np.finfo(np.float32).min and c_max <\
				   np.finfo(np.float32).max:
					df[col] = df[col].astype(np.float32)
				else:
					df[col] = df[col].astype(np.float64)
		else:
			next    
	return df

def removeInfrequentAbstract(abstract):
	# Prepare to create n-grams: unigrams through trigrams 
	vectorizer = CountVectorizer(ngram_range=(1,3))
	analyzer = vectorizer.build_analyzer()

	abstract_counter = Counter(analyzer(" ".join(abstract)))
	return Counter({x : abstract_counter[x] for x in abstract_counter if abstract_counter[x] > 1})

def divide_chunks(l, n): 
	# looping till length l 
	for i in range(0, len(l), n):  
		yield l[i:i + n]

def split_dict(x, chunks):      
	i = itertools.cycle(range(chunks))       
	split = [dict() for _ in range(chunks)]
	for k, v in x.items():
		split[next(i)][k] = v
	return split

def fieldIDString(x,input_discipline):
	return x.replace("FIELDIDHERE", input_discipline)

def currentMemoryUsage():
	pid = os.getpid()
	py = psutil.Process(pid)
	memoryUse = py.memory_info()[0]/2.**30
	print("Current Memory Usage: "+str(memoryUse)+" GB")

def fastAthenaQuery(input_sql, input_discipline, input_chunksize = None):
	
	#This function will return a stream of the s3 file.
	# MUCH faster than PyAthena reading a few rows at a time via the API
	""":Return: a Pandas DataFrame of results from a `sql` query executed against AWS Athena."""
	cursor.execute(fieldIDString(input_sql,input_discipline))
	#The s3_path should be of the format: '<bucket_name>/<file_path_inside_the_bucket>'
	#This is the full path with credentials:
	complete_s3_path = 'REDCATED' + cursor.output_location.split("s3://")[1]

	if input_chunksize == None:
		outputfile = pd.read_csv(smart_open.smart_open(complete_s3_path))
	else:
		outputfile = pd.read_csv(smart_open.smart_open(complete_s3_path),chunksize=input_chunksize)

	# Delete the file from the s3 bucket
	s3 = boto3.resource("s3",
		aws_access_key_id='REDACTED',
		aws_secret_access_key= 'REDACTED')
	obj = s3.Object("REDACTED",
		complete_s3_path)
	obj.delete()

	return outputfile

###############################
### Test If Abstract DataFrames Exit
###############################
TEMP_English_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename = "TEMP_English_OUTPUT_Python_MAG_Field_Corpus_RAKE_and_GoogleAPI_"+str(discipline)+".pbz2"

TEMP_NonEnglish_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename = "TEMP_NonEnglish_OUTPUT_Python_MAG_Field_Corpus_RAKE_and_GoogleAPI_"+str(discipline)+".pbz2"

if (os.path.exists(TEMP_English_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename)==False) or (os.path.exists(TEMP_NonEnglish_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename)==False):

	###############################
	### Connections
	###############################

	cursor = connect(aws_access_key_id='REDACTED',
					 aws_secret_access_key='REDACTED',
					 s3_staging_dir='REDACTED',
					 region_name='REDACTED').cursor()

	conn = connect(aws_access_key_id='REDACTED',
					 aws_secret_access_key='REDACTED',
					 s3_staging_dir='REDACTED',
					 region_name='REDACTED')


	###############################
	### Paper ID -- Year and Title
	###############################

	check_paper_query = '''
	select true as returnTrue where EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'temp_paper_FIELDIDHERE');
	'''

	drop_paper_query = '''
	drop table if exists mag_staging.temp_paper_FIELDIDHERE;
	'''

	paper_query = '''
	create table mag_staging.temp_paper_FIELDIDHERE 
	WITH (
		format = 'TEXTFILE',
		field_delimiter = '\t'
	  )
	as 
	SELECT Paper_Table.paperid as PaperID,
		   Paper_Table.papertitle as Title,
		   Paper_Table.full_year as Year,
		   Paper_Table.doctype as DocType
	FROM (
	  Select Paper_Discipline_Table.paperid, 
			 Discipline_Table.displayname
	  FROM "mag_data"."paperfieldsofstudy" as Paper_Discipline_Table
	  JOIN "mag_data"."fieldsofstudy" as Discipline_Table
	  ON Discipline_Table.fieldofstudyid = Paper_Discipline_Table.fieldofstudyid
	  WHERE Discipline_Table.fieldofstudyid = FIELDIDHERE) as Paper_Discipline_Key_Table
	JOIN "mag_data"."papers" as Paper_Table
	ON Paper_Table.paperid = Paper_Discipline_Key_Table.paperid;
	'''

	if pd.read_sql(fieldIDString(check_paper_query,discipline), conn)["returnTrue"].empty == True:
		cursor.execute(fieldIDString(paper_query,discipline))

	################################
	## Paper ID Year and Title
	################################

	read_paper_query = '''
	SELECT * FROM mag_staging.temp_paper_FIELDIDHERE
	'''
	df_year = fastAthenaQuery(read_paper_query,discipline,None)
	df_year = df_year[["paperid","year"]]

	gc.collect()

	################################
	## Abstract Text
	################################
	check_abstract_query = '''
	select true as returnTrue where EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = 'temp_abstract_FIELDIDHERE');
	'''

	drop_abstract_query = '''
	drop table if exists mag_staging.temp_abstract_FIELDIDHERE
	'''

	abstract_query = '''
	create table mag_staging.temp_abstract_FIELDIDHERE
	WITH (
		format = 'TEXTFILE',
		field_delimiter = '\t'
	  )
	as
	SELECT indexedabstract,
		   paperid as PaperID,
		   fieldofstudyid as FieldID
	FROM "mag_data"."test6_paii"
	WHERE fieldofstudyid = FIELDIDHERE
	'''

	if pd.read_sql(fieldIDString(check_abstract_query,discipline), conn)["returnTrue"].empty == True:
		cursor.execute(fieldIDString(abstract_query,discipline))

	######################################
	#### Process Abstract Text
	######################################

	read_abstract_query = '''
	SELECT * FROM mag_staging.temp_abstract_FIELDIDHERE
	'''

	chunksize_input = 10 ** 6
	df_abstract_list = []
	for df_abstract_ in fastAthenaQuery(read_abstract_query,discipline,chunksize_input):
		del df_abstract_["fieldid"]
		gc.collect()
		df_abstract_list.append(df_abstract_)
		del df_abstract_
		gc.collect()

	df_abstract = pd.concat(df_abstract_list)


	######################################
	#### Create Abstracts
	######################################
	def create_text_and_langauge(indexedabstract_input):
		inverted_index = json.loads(indexedabstract_input)["InvertedIndex"]	
		index = {k: str(oldk).lstrip().rstrip().lower().translate(str.maketrans('', '', string.punctuation)) for oldk, oldv in inverted_index.items() for k in oldv}

		if len(index.values())<30:
			return ""

		abstract_text = ""
		for i in range(0,len(index)):
			try:
				abstract_text += index[i] + " "
			except:
				next

		return abstract_text

	create_text_and_langauge_FF = np.frompyfunc(create_text_and_langauge,1,1)

	df_abstract["Abstract"] = create_text_and_langauge_FF(df_abstract["indexedabstract"])

	del df_abstract["indexedabstract"]
	gc.collect()
	gc.collect()

	######################################
	#### Extract Language
	######################################
	def IsEnglish(x):
		try:
			return detect(x)
		except: 
			return "REMOVE"

	# IsEnglish_FF = np.frompyfunc(IsEnglish,1,1)

	def Parallel_IsEnglish(df_abstract_input):
		print("Started")
		return df_abstract_input.apply(lambda x: IsEnglish(x))

	number_of_CPUs = 15
	print("Number of CPUs: "+str(number_of_CPUs))

	if __name__ == "__main__":
		IsEnglish_List = []
		# Use Pool to split up the corpus into number_of_splits and clean and process corpus
		pool = mp.Pool(number_of_CPUs)
		for index_, result_ in enumerate(pool.imap(Parallel_IsEnglish, np.array_split(df_abstract["Abstract"],number_of_CPUs))):
			print("Corpus Identify Language | "+str(number_of_CPUs-index_)+" "+str(psutil.virtual_memory()[2]))
			cpuStats()
			IsEnglish_List.append(result_)
		pool.close()
		pool.join()

	df_abstract["Is_English"] = pd.DataFrame(pd.concat(IsEnglish_List)).rename(columns={"Abstract":"Is_English"})

	del IsEnglish_List
	gc.collect()
	gc.collect()

	######################################
	#### Translate Language
	######################################

	############
	### English
	############
	df_abstract_English = df_abstract.query("Is_English=='en'")

	def translate_English(x):
		try:
			r = Rake(min_length=1,max_length=3,stopwords=stopword_list,punctuations=string.punctuation) # Uses stopwords for english from NLTK, and all puntuation characters.
			r.extract_keywords_from_text(x)
			return r.get_ranked_phrases()
		except:
			return []

	translate_English_FF = np.frompyfunc(translate_English,1,1)

	def Parallel_translate_English_FF(x):
		return translate_English_FF(x)

	number_of_CPUs = 15
	print("Number of CPUs: "+str(number_of_CPUs))

	if __name__ == "__main__":
		translate_English_FF_List = []
		# Use Pool to split up the corpus into number_of_splits and clean and process corpus
		pool = mp.Pool(number_of_CPUs)
		for index_, result_ in enumerate(pool.imap(Parallel_translate_English_FF, np.array_split(df_abstract_English["Abstract"],number_of_CPUs))):
			print("English Corpus RAKE | "+str(number_of_CPUs-index_)+" "+str(psutil.virtual_memory()[2]))
			currentMemoryUsage()
			translate_English_FF_List.append(result_)
		pool.close()
		pool.join()

	df_abstract_English["Abstract"] = pd.DataFrame(pd.concat(translate_English_FF_List))

	del translate_English_FF_List
	gc.collect()
	gc.collect()

	######################################
	#### Save English DataFrame
	######################################

	# English
	with bz2.BZ2File(TEMP_English_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename, 'w') as f:
		pickle.dump(df_abstract_English,f, protocol=2) # Python2 only accepts 0,1, or 2	
	f.close()

	# Non-English
	df_abstract_NonEnglish = df_abstract.query("Is_English!='en' and Is_English!='REMOVE'")
	with bz2.BZ2File(TEMP_NonEnglish_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename, 'w') as f:
		pickle.dump(df_abstract_NonEnglish,f, protocol=2) # Python2 only accepts 0,1, or 2	
	f.close()

	del df_abstract
	gc.collect()
	gc.collect()

else:
	# English
	englishf = bz2.BZ2File(TEMP_English_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename,'rb')
	df_abstract_English = pickle.load(englishf,encoding='latin1') #For reading in from Python2 Pickle

	# Non-English
	nonenglishf = bz2.BZ2File(TEMP_NonEnglish_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename,'rb')
	df_abstract_NonEnglish = pickle.load(nonenglishf,encoding='latin1') #For reading in from Python2 Pickle

###############################
### Test Google API Connection
###############################

test_translation = ["This is the first test to see if the Google API Connection works.","This is the second test to see if the Google API Connection works.","This is the third test to see if the Google API Connection works."]

for x in test_translation:
	try:
		translator = Translator()
		translator.raise_Exception = True
		print(translator.translate(x).text)
		time.sleep(5)
	except Exception as e: 
		if "429" in str(e):
			print(str(e))
			sys.exit()
		else:
			print(str(e))

############
### Non English Translation
############

def translate_NonEnglish(x):
	try:
		RAKE_Translate = Rake(min_length=1,max_length=3,stopwords=stopword_list,punctuations=string.punctuation) # Uses stopwords for english from NLTK, and all puntuation characters.
		time.sleep(2)
		translator = Translator()
		translator.raise_Exception = True
		RAKE_Translate.extract_keywords_from_text(translator.translate(x).text)
		return RAKE_Translate.get_ranked_phrases()
	except Exception as e: 
		if "429" in str(e):
			print(str(e))
			# Exit the program if connection is lost
			sys.exit()
		else:
			print(str(e))
			return []

translate_NonEnglish_List = []
for index, df_input in enumerate(np.array_split(df_abstract_NonEnglish["Abstract"],df_abstract_NonEnglish.shape[0]/100)):
	
	start_translation_time = time.time()
	
	translate_NonEnglish_List.append(df_input.apply(lambda x: translate_NonEnglish(x)))

	time.sleep(100) #After 100 rows, take a 60 second break.
	
	print("Finished Batch "+str(index+1)+" of "+str(round(df_abstract_NonEnglish.shape[0]/100))+" in "+str((time.time() - start_translation_time)/60)+" Minutes")
	

df_abstract_NonEnglish["Abstract"] = pd.DataFrame(pd.concat(translate_NonEnglish_List))

######################################
######## Combine
######################################

# Convert to Dictionary
corpus_RAKE_and_GoogleAPI = df_abstract_English.append(df_abstract_NonEnglish)
#corpus_RAKE_and_GoogleAPI = pd.merge(corpus_RAKE_and_GoogleAPI,df_year,on=["paperid"],how="left")
#corpus_RAKE_and_GoogleAPI = corpus_RAKE_and_GoogleAPI.groupby('year')[['paperid','Abstract']].apply(lambda x: x.set_index('paperid').T.to_dict(orient='index')["Abstract"]).to_dict()


del df_abstract_English
del df_abstract_NonEnglish
gc.collect()
gc.collect()
gc.collect()

######################################
######## Output RAKE Corpus
######################################

Field_RAKE_and_GoogleAPI_Corpus_Filename = "OUTPUT_Python_MAG_Field_Corpus_RAKE_and_GoogleAPI_"+str(discipline)+".pbz2"

with bz2.BZ2File(Field_RAKE_and_GoogleAPI_Corpus_Filename, 'w') as f:
	pickle.dump(corpus_RAKE_and_GoogleAPI,f, protocol=2) # Python2 only accepts 0,1, or 2
f.close()


if os.path.exists(TEMP_English_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename):
	os.remove(TEMP_English_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename)
if os.path.exists(TEMP_NonEnglish_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename):
	os.remove(TEMP_NonEnglish_Abstract_Field_RAKE_and_GoogleAPI_Corpus_Filename)


print("Total Run Time: "+str((time.time() - start_time)/60)+" Minutes")
cpuStats()